Author- Shubhi Jain (Data Engineer) and Kriti Bhardwaj (Associate Data Engineer)
This article describes how to configure CDC (Change Data Capture) in Azure SQL DB to reflect the changes made in the source database to the destination database using Azure Data Factory.
Prerequisites:
- Two Azure SQL databases out of one must be of S3 Pricing tier or more. (CDC is supported in S3 Pricing tier)
- Azure Data factory
- SSMS
- Prepare the source data store.
- Configure CDC (Change Data Capture) on source database and table.
- Create a data factory.
- Create linked services.
- Create source and sink datasets.
- Create, debug and run the pipeline to check for changed data.
- Modify data in the source table.
- Complete, run, and monitor the full incremental copy pipeline.
Step1: Create a data source table in Azure SQL Database
create table customers ( customer_id int, first_name varchar(50), last_name varchar(50), email varchar(100), city varchar(50), CONSTRAINT "PK_Customers" PRIMARY KEY CLUSTERED ("customer_id") );
Step2: Enable the Change Data Capture mechanism on your database and the source table (customers) by running the following SQL query:
EXEC sys.sp_cdc_enable_db EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = 'null', @supports_net_changes = 1
Step3: Insert data into the customers table by running the following command:
insert into customers (customer_id, first_name, last_name, email, city) values (1, 'Chevy', 'Leward', 'cleward0@mapy.cz', 'Reading'), (2, 'Sayre', 'Ateggart', 'sateggart1@nih.gov', 'Portsmouth'), (3, 'Nathalia', 'Seckom', 'nseckom2@blogger.com', 'Portsmouth');
Step4: Create a data factory.
Step5: Create a linked service in a data factory. In this section, you create linked services to both Azure SQL databases.
5.1. Click Monitor and click + New linked service. 5.2. In the New Linked Service window, select Azure SQL database and click Continue. 5.3. In the New Linked Service window, do the following steps: 5.3.1. Enter AzureSQLdatabaseLinkedService1 for Name. 5.3.2. Select your database for Azure SQL database. 5.3.3. Enter the credentials 5.3.4. Click Save. (*Create same linked service for destination Database)Step6: Create a dataset to represent source data
6.1. In the tree view, click + (plus), and click Dataset. 6.2. Select azure SQL Database and click Continue. 6.3. In the Set properties tab, set the dataset name and connection information: 6.4. Select linked service name 6.5. Select [dbo].[dbo_customers_CT] for Table name. Note: this table was automatically created when CDC was enabled on the customers table. Changed data is never queried from this table directly but is instead extracted through the CDC functions. (*Create a similar dataset to represent data copied to sink data store for another Azure SQL database.)Step7: Create a pipeline to copy the changed data
In this step, you create a pipeline, which first checks the number of changed records present in the change table using a lookup activity. An IF condition activity checks whether the number of changed records is greater than zero and runs copy activities to copy the inserted/updated/deleted data separately from source Azure SQL Database to destination Azure SQL database. Lastly, a tumbling window trigger is configured, and the start and end times will be passed to the activities as the start and end window parameters. 7.1. In the Data Factory, switch to the Edit tab. Click + (plus) in the left pane and click Pipeline. 7.2. Click on the pipeline in the tree view. In the Properties window, change the name of the pipeline to IncrementalCopyPipeline. 7.3. Now drag-drop the Lookup activity to the pipeline designer surface. Set the name of the activity to GetChangeCount. This activity gets the number of records in the change table for a given time window. 7.4. Now switch to the Settings in the Properties window: 7.4.1. Specify the SQL database dataset name for the Source Dataset field. 7.4.2. Select the Query option and enter the following into the query box:7.4.3. Enable the First row only. 7.5. Click the Preview data button to ensure a valid output is obtained by the lookup activity 7.6. Now drag-drop the If Condition activity to the pipeline designer surface. Set the name of the activity to HasChangedRows. Switch to the Activities in the Properties window: 7.6.1. Enter the following ExpressionDECLARE @from_lsn binary(10), @to_lsn binary(10); SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE()); SELECT count(1) changecount FROM cdc.fn_cdc_get_all_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
7.6.2. Click on the pencil icon to edit the True condition 7.7. Click on True and Go to the Activities toolbox and drag-drop 3 Copy activities and 2 stored procedures to the pipeline designer surface. Set the name of the activities as follows: 7.8. Switch to the Source tab in the Properties window of copy data_insert and do the following steps: 7.8.1. Specify the SQL Database dataset name for the Source Dataset field. 7.8.2. Select Query for Use Query. 7.8.3. Enter the following for Query.@greater(int(activity('GetChangeCount').output.firstRow.changecount),0)
7.9. Switch to the Sink tab and select the destination Azure SQL database dataset for the Sink Dataset field. 7.10. Switch to the Source tab in the Properties window of copy data_delete and do the following steps: 7.10.1. Specify the SQL Database dataset name for the Source Dataset field. 7.10.2. Select Query for Use Query. 7.10.3. Enter the following for Query.DECLARE @from_lsn binary(10), @to_lsn binary(10); SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE()); SELECT customer_id, first_name, last_name, email, city FROM cdc.fn_cdc_get_all_changes_dbo_customers(@from_lsn, @to_lsn, 'all') where __$operation=2
7.11. Switch to the Sink tab and select the destination as customers_staging table for the Sink Dataset field.DECLARE @from_lsn binary(10), @to_lsn binary(10); SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE()); SELECT __$operation,customer_id, first_name, last_name, email, city FROM cdc.fn_cdc_get_all_changes_dbo_customers(@from_lsn, @to_lsn, 'all') where __$operation=1
Step8: Create a staging Table at the destination database to use it as a sink dataset at Copy data_delete activity in SSMS
Step9: Now connect the first Stored procedure next to the copy data_delete activity.
9.1. Switch to the Settings tab and select the linked service and stored procedure.Step10: For creating a stored procedure to delete the data from the staging table on destination DB run the following query on SSMS.
SET ANSI_NULLS ON GOSET QUOTED_IDENTIFIER ON GO-- ============================================= -- Author: <Author, , Name> -- Create Date: -- Description: <Description, , > -- ============================================= CREATE PROCEDURE [dbo].[customer_delete]AS BEGIN -- SET NOCOUNT ON added to prevent extra result sets from -- interfering with SELECT statements. SET NOCOUNT ON;delete from dbo.customers where customer_id in (select customer_id from dbo.customers_staging);truncate table dbo.customers_staging; END GO
Step11: Now connect 3rd Copy activity i.e., copy data_update.
11.1. Specify the SQL Database dataset name for the Source Dataset field. 11.2. Select Query for Use Query. 11.3. Enter the following Query.DECLARE @from_lsn binary(10), @to_lsn binary(10); SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE()); SELECT __$operation,customer_id, first_name, last_name, email, city FROM cdc.fn_cdc_get_all_changes_dbo_customers(@from_lsn, @to_lsn, 'all') where __$operation=4
Step12: Switch to the Sink tab and specify the destination as customers_staging table for the Sink Dataset field.
Step13: Now connect the second Stored procedure next to the copy data_update activity.
Step14: Switch to the Settings tab and select the linked service and stored procedure.
For creating a stored procedure to update the data from the staging table on destination DB run the following query on SSMSSET ANSI_NULLS ON GOSET QUOTED_IDENTIFIER ON GO-- ============================================= -- Author: <Author, , Name> -- Create Date: -- Description: <Description, , > -- ============================================= CREATE PROCEDURE [dbo].[customer_update]AS BEGIN -- SET NOCOUNT ON added to prevent extra result sets from -- interfering with SELECT statements. SET NOCOUNT ON;update t1 set t1.first_name = t2.first_name, t1.last_name =t2.last_name, t1.email =t2.email, t1.city =t2.city from dbo.customers t1 join dbo.customers_staging t2 on t1.customer_id=t2.customer_id where t1.customer_id=t2.customer_idtruncate table dbo.customers_staging; END GO